-- lua-resty-rabbitmqstomp: Opinionated RabbitMQ (STOMP) client lib
-- Copyright (C) 2013 Rohit 'bhaisaab' Yadav, Wingify
-- Opensourced at Wingify in New Delhi under the MIT License
-- 为连接 rabbitmq 用的服务 使用stomp 协议

local byte = string.byte
local concat = table.concat
local error = error
local find = string.find
local gsub = string.gsub
local insert = table.insert
local len = string.len
local pairs = pairs
local setmetatable = setmetatable
local sub = string.sub

local skynet = require "skynet"
local socketchannel =  require "skynet.socketchannel"
local LF = "\x0a"
local EOL = "\x0d\x0a"
local NULL_BYTE = "\x00"

local RabbitMQ = class("RabbitMQ")


--消息组装
function RabbitMQ:buildFrame(command, headers, body)
    local frame = {command, EOL}
    if body then
        headers["content-length"] = len(body)
    end
    for key, value in pairs(headers) do
        insert(frame, key)
        insert(frame, ":")
        insert(frame, value)
        insert(frame, EOL)
    end
    insert(frame, EOL)
    if body then
        insert(frame, body)
    end
    insert(frame, NULL_BYTE)
    insert(frame, EOL)
    return concat(frame, "")
end

function RabbitMQ:dispatch(socket)
    local frame = nil
    if self.opts.trailing_lf == nil or self.opts.trailing_lf == true then
        frame = socket:readline(NULL_BYTE .. LF)
    else
        frame = socket:readline(NULL_BYTE)
    end

    if not frame then
        return false
    end
    if sub(frame, 1, len('ERROR') ) == 'ERROR' then
        skynet.error("rabbitmq error:", frame)
    end
    -- skynet.error("111resp frame:", frame)
    local idx = find(frame, "\n\n", 2)
    -- print("______self.subscribe_callback___",self.subscribe_callback)
    self.subscribe_callback(sub(frame, idx + 2))    

    return true, frame
end

--发送消息到rabbimq
function RabbitMQ:sendFrame(frame)
    return self.socket:request(frame)
end

--连接成功后登录
function RabbitMQ:rabbitmq_login(sc)
    local headers = {}
    headers["accept-version"] = "1.2"
    headers["login"] = self.opts.username
    headers["passcode"] = self.opts.password
    headers["host"] = self.opts.vhost
    return self:sendFrame(self:buildFrame("CONNECT", headers, nil))
end

--连接
function RabbitMQ:connect(conf, opts)
    if opts == nil then
        opts = {username = "guest", password = "guest", vhost = "/", trailing_lf = true}
    end
    self.opts = opts
    self.socket = socketchannel.channel({
        auth = handler(self, self.rabbitmq_login),
        host = conf.host or "127.0.0.1",
        port = conf.port or 61613,
        nodelay = true,
        overload = conf.overload,
    }) 
    --连接一次
    self.socket:connect(true)
    print("____________连接成功____")
    -- return obj
end

--发送消息
function RabbitMQ:send(smsg, headers, callback)

    return self.socket:request(self:buildFrame("SEND", headers, smsg), callback)
end

--订阅
function RabbitMQ:subscribe(headers, callback)
    self.subscribe_callback = callback 
    return self:sendFrame(self:buildFrame("SUBSCRIBE", headers))
end

--取消订阅
function RabbitMQ:unsubscribe(headers)
    return self:sendFrame(self:buildFrame("UNSUBSCRIBE", headers))
end

--接收消息
function RabbitMQ:receive()
    local so = self.socket
    while so do
        local data, err = so:response(handler(self, self.dispatch))
    end
end

--关闭
function RabbitMQ:close()
    if self.socket then
        -- Graceful shutdown
        local headers = {}
        headers["receipt"] = "disconnect"
        self:sendFrame(self:buildFrame("DISCONNECT", headers, nil))
    end
    self.socket:close()
    self.socket = nil
end


return RabbitMQ